{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Query Planning Workflow\n",
    "\n",
    "In this notebook, we'll walk through an example of a query planning workflow.\n",
    "\n",
    "This workflow is useful for any system that needs iterative planning to answer a user's query, as it decomposes a query into smaller steps, executes those steps, and aggregates the results.\n",
    "\n",
    "Once a plan is executed, we can use the results to form a final response to the user's query or to form a new query plan if the current plan was not sufficient to answer the query."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Setup\n",
    "\n",
    "We will use OpenAI models, as well as llama-parse to load and parse documents."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import os\n",
    "\n",
    "os.environ[\"OPENAI_API_KEY\"] = \"sk-proj-...\"\n",
    "os.environ[\"LLAMA_CLOUD_API_KEY\"] = \"llx-...\""
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "!mkdir -p \"./data/sf_budgets/\"\n",
    "!wget \"https://www.dropbox.com/scl/fi/xt3squt47djba0j7emmjb/2016-CSF_Budget_Book_2016_FINAL_WEB_with-cover-page.pdf?rlkey=xs064cjs8cb4wma6t5pw2u2bl&dl=0\" -O \"./data/sf_budgets/2016 - CSF_Budget_Book_2016_FINAL_WEB_with-cover-page.pdf\"\n",
    "!wget \"https://www.dropbox.com/scl/fi/jvw59g5nscu1m7f96tjre/2017-Proposed-Budget-FY2017-18-FY2018-19_1.pdf?rlkey=v988oigs2whtcy87ti9wti6od&dl=0\" -O \"./data/sf_budgets/2017 - 2017-Proposed-Budget-FY2017-18-FY2018-19_1.pdf\"\n",
    "!wget \"https://www.dropbox.com/scl/fi/izknlwmbs7ia0lbn7zzyx/2018-o0181-18.pdf?rlkey=p5nv2ehtp7272ege3m9diqhei&dl=0\" -O \"./data/sf_budgets/2018 - 2018-o0181-18.pdf\"\n",
    "!wget \"https://www.dropbox.com/scl/fi/1rstqm9rh5u5fr0tcjnxj/2019-Proposed-Budget-FY2019-20-FY2020-21.pdf?rlkey=3s2ivfx7z9bev1r840dlpbcgg&dl=0\" -O \"./data/sf_budgets/2019 - 2019-Proposed-Budget-FY2019-20-FY2020-21.pdf\"\n",
    "!wget \"https://www.dropbox.com/scl/fi/7teuwxrjdyvgw0n8jjvk0/2021-AAO-FY20-21-FY21-22-09-11-2020-FINAL.pdf?rlkey=6br3wzxwj5fv1f1l8e69nbmhk&dl=0\" -O \"./data/sf_budgets/2021 - 2021-AAO-FY20-21-FY21-22-09-11-2020-FINAL.pdf\"\n",
    "!wget \"https://www.dropbox.com/scl/fi/zhgqch4n6xbv9skgcknij/2022-AAO-FY2021-22-FY2022-23-FINAL-20210730.pdf?rlkey=h78t65dfaz3mqbpbhl1u9e309&dl=0\" -O \"./data/sf_budgets/2022 - 2022-AAO-FY2021-22-FY2022-23-FINAL-20210730.pdf\"\n",
    "!wget \"https://www.dropbox.com/scl/fi/vip161t63s56vd94neqlt/2023-CSF_Proposed_Budget_Book_June_2023_Master_Web.pdf?rlkey=hemoce3w1jsuf6s2bz87g549i&dl=0\" -O \"./data/sf_budgets/2023 - 2023-CSF_Proposed_Budget_Book_June_2023_Master_Web.pdf\""
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Workflow Definition\n",
    "\n",
    "### Workflow Events\n",
    "\n",
    "Since `Event` objects in workflows are just Pydantic models, we can use the function calling capabilities of OpenAI to dynamically define the execution of our workflow at runtime.\n",
    "\n",
    "By predicting events, we are predicting the next step(s) in our workflow to run."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from pydantic import BaseModel, Field\n",
    "from llama_index.core.workflow import Event\n",
    "\n",
    "\n",
    "class QueryPlanItem(Event):\n",
    "    \"\"\"A single step in an execution plan for a RAG system.\"\"\"\n",
    "\n",
    "    name: str = Field(description=\"The name of the tool to use.\")\n",
    "    query: str = Field(\n",
    "        description=\"A natural language search query for a RAG system.\"\n",
    "    )\n",
    "\n",
    "\n",
    "class QueryPlan(BaseModel):\n",
    "    \"\"\"A plan for a RAG system. After running the plan, we should have either enough information to answer the user's original query, or enough information to form a new query plan.\"\"\"\n",
    "\n",
    "    items: list[QueryPlanItem] = Field(\n",
    "        description=\"A list of the QueryPlanItem objects in the plan.\"\n",
    "    )"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "In addition to the query plan, we also need some workflow events to collect the results of the query plan items."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "class QueryPlanItemResult(Event):\n",
    "    \"\"\"The result of a query plan item\"\"\"\n",
    "\n",
    "    query: str\n",
    "    result: str\n",
    "\n",
    "\n",
    "class ExecutedPlanEvent(Event):\n",
    "    \"\"\"The result of a query plan\"\"\"\n",
    "\n",
    "    result: str"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Workflow Definition\n",
    "\n",
    "Now we can define our workflow. We will use an iterative process where we plan, execute, aggregate, and decide in an loop, until we have a final answer or a new query plan."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from llama_index.core.workflow import (\n",
    "    Workflow,\n",
    "    StopEvent,\n",
    "    StartEvent,\n",
    "    Context,\n",
    "    step,\n",
    ")\n",
    "from llama_index.core.prompts import PromptTemplate\n",
    "from llama_index.llms.openai import OpenAI\n",
    "\n",
    "\n",
    "class QueryPlanningWorkflow(Workflow):\n",
    "    llm = OpenAI(model=\"gpt-4o\")\n",
    "    planning_prompt = PromptTemplate(\n",
    "        \"Think step by step. Given an initial query, as well as information about the indexes you can query, return a plan for a RAG system.\\n\"\n",
    "        \"The plan should be a list of QueryPlanItem objects, where each object contains a query.\\n\"\n",
    "        \"The result of executing an entire plan should provide a result that is a substantial answer to the initial query, \"\n",
    "        \"or enough information to form a new query plan.\\n\"\n",
    "        \"Sources you can query: {context}\\n\"\n",
    "        \"Initial query: {query}\\n\"\n",
    "        \"Plan:\"\n",
    "    )\n",
    "    decision_prompt = PromptTemplate(\n",
    "        \"Given the following information, return a final response that satisfies the original query, or return 'PLAN' if you need to continue planning.\\n\"\n",
    "        \"Original query: {query}\\n\"\n",
    "        \"Current results: {results}\\n\"\n",
    "    )\n",
    "\n",
    "    @step\n",
    "    async def planning_step(\n",
    "        self, ctx: Context, ev: StartEvent | ExecutedPlanEvent\n",
    "    ) -> QueryPlanItem | StopEvent:\n",
    "        if isinstance(ev, StartEvent):\n",
    "            # Initially, we need to plan\n",
    "            query = ev.get(\"query\")\n",
    "\n",
    "            tools = ev.get(\"tools\")\n",
    "\n",
    "            await ctx.store.set(\"tools\", {t.metadata.name: t for t in tools})\n",
    "            await ctx.store.set(\"original_query\", query)\n",
    "\n",
    "            context_str = \"\\n\".join(\n",
    "                [\n",
    "                    f\"{i+1}. {tool.metadata.name}: {tool.metadata.description}\"\n",
    "                    for i, tool in enumerate(tools)\n",
    "                ]\n",
    "            )\n",
    "            await ctx.store.set(\"context\", context_str)\n",
    "\n",
    "            query_plan = await self.llm.astructured_predict(\n",
    "                QueryPlan,\n",
    "                self.planning_prompt,\n",
    "                context=context_str,\n",
    "                query=query,\n",
    "            )\n",
    "\n",
    "            ctx.write_event_to_stream(\n",
    "                Event(msg=f\"Planning step: {query_plan}\")\n",
    "            )\n",
    "\n",
    "            num_items = len(query_plan.items)\n",
    "            await ctx.store.set(\"num_items\", num_items)\n",
    "            for item in query_plan.items:\n",
    "                ctx.send_event(item)\n",
    "        else:\n",
    "            # If we've already gone through planning and executing, we need to decide\n",
    "            # if we should continue planning or if we can stop and return a result.\n",
    "            query = await ctx.store.get(\"original_query\")\n",
    "            current_results_str = ev.result\n",
    "\n",
    "            decision = await self.llm.apredict(\n",
    "                self.decision_prompt,\n",
    "                query=query,\n",
    "                results=current_results_str,\n",
    "            )\n",
    "\n",
    "            # Simple string matching to see if we need to keep planning or if we can stop.\n",
    "            if \"PLAN\" in decision:\n",
    "                context_str = await ctx.store.get(\"context\")\n",
    "                query_plan = await self.llm.astructured_predict(\n",
    "                    QueryPlan,\n",
    "                    self.planning_prompt,\n",
    "                    context=context_str,\n",
    "                    query=query,\n",
    "                )\n",
    "\n",
    "                ctx.write_event_to_stream(\n",
    "                    Event(msg=f\"Re-Planning step: {query_plan}\")\n",
    "                )\n",
    "\n",
    "                num_items = len(query_plan.items)\n",
    "                await ctx.store.set(\"num_items\", num_items)\n",
    "                for item in query_plan.items:\n",
    "                    ctx.send_event(item)\n",
    "            else:\n",
    "                return StopEvent(result=decision)\n",
    "\n",
    "    @step(num_workers=4)\n",
    "    async def execute_item(\n",
    "        self, ctx: Context, ev: QueryPlanItem\n",
    "    ) -> QueryPlanItemResult:\n",
    "        tools = await ctx.store.get(\"tools\")\n",
    "        tool = tools[ev.name]\n",
    "\n",
    "        ctx.write_event_to_stream(\n",
    "            Event(\n",
    "                msg=f\"Querying tool {tool.metadata.name} with query: {ev.query}\"\n",
    "            )\n",
    "        )\n",
    "\n",
    "        result = await tool.acall(ev.query)\n",
    "\n",
    "        ctx.write_event_to_stream(\n",
    "            Event(msg=f\"Tool {tool.metadata.name} returned: {result}\")\n",
    "        )\n",
    "\n",
    "        return QueryPlanItemResult(query=ev.query, result=str(result))\n",
    "\n",
    "    @step\n",
    "    async def aggregate_results(\n",
    "        self, ctx: Context, ev: QueryPlanItemResult\n",
    "    ) -> ExecutedPlanEvent:\n",
    "        # We need to collect the results of the query plan items to aggregate them.\n",
    "        num_items = await ctx.store.get(\"num_items\")\n",
    "        results = ctx.collect_events(ev, [QueryPlanItemResult] * num_items)\n",
    "\n",
    "        # collect_events returns None if not all events were found\n",
    "        # return and wait for the remaining events to come in.\n",
    "        if results is None:\n",
    "            return\n",
    "\n",
    "        aggregated_result = \"\\n------\\n\".join(\n",
    "            [\n",
    "                f\"{i+1}. {result.query}: {result.result}\"\n",
    "                for i, result in enumerate(results)\n",
    "            ]\n",
    "        )\n",
    "        return ExecutedPlanEvent(result=aggregated_result)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Loading Data\n",
    "\n",
    "Here, we use `llama-parse` to load and parse documents, and create an index for each year's budget."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from llama_parse import LlamaParse\n",
    "\n",
    "parser = LlamaParse(fast_mode=True)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from llama_index.core import (\n",
    "    VectorStoreIndex,\n",
    "    StorageContext,\n",
    "    load_index_from_storage,\n",
    ")\n",
    "from llama_index.core.tools import QueryEngineTool\n",
    "\n",
    "folder = \"./data/sf_budgets/\"\n",
    "files = os.listdir(folder)\n",
    "\n",
    "query_engine_tools = []\n",
    "for file in files:\n",
    "    year = file.split(\" - \")[0]\n",
    "    index_persist_path = f\"./storage/budget-{year}/\"\n",
    "\n",
    "    if os.path.exists(index_persist_path):\n",
    "        storage_context = StorageContext.from_defaults(\n",
    "            persist_dir=index_persist_path\n",
    "        )\n",
    "        index = load_index_from_storage(storage_context)\n",
    "    else:\n",
    "        documents = await parser.aload_data(folder + file)\n",
    "        index = VectorStoreIndex.from_documents(documents)\n",
    "        index.storage_context.persist(index_persist_path)\n",
    "\n",
    "    engine = index.as_query_engine()\n",
    "    query_engine_tools.append(\n",
    "        QueryEngineTool.from_defaults(\n",
    "            engine,\n",
    "            name=f\"budget_{year}\",\n",
    "            description=f\"Information about San Francisco's budget in {year}\",\n",
    "        )\n",
    "    )"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Testing out the Workflow\n",
    "\n",
    "Let's test out our workflow with a few queries.\n",
    "\n",
    "Since we wrote a few stream events, we can see the execution of the workflow as it runs."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "workflow = QueryPlanningWorkflow(verbose=False, timeout=120)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Planning step: items=[QueryPlanItem(name='budget_2016', query=\"What was the total amount of San Francisco's budget in 2016?\"), QueryPlanItem(name='budget_2017', query=\"What was the total amount of San Francisco's budget in 2017?\"), QueryPlanItem(name='budget_2018', query=\"What was the total amount of San Francisco's budget in 2018?\"), QueryPlanItem(name='budget_2019', query=\"What was the total amount of San Francisco's budget in 2019?\"), QueryPlanItem(name='budget_2021', query=\"What was the total amount of San Francisco's budget in 2021?\"), QueryPlanItem(name='budget_2022', query=\"What was the total amount of San Francisco's budget in 2022?\"), QueryPlanItem(name='budget_2023', query=\"What was the total amount of San Francisco's budget in 2023?\")]\n",
      "Querying tool budget_2016 with query: What was the total amount of San Francisco's budget in 2016?\n",
      "Querying tool budget_2017 with query: What was the total amount of San Francisco's budget in 2017?\n",
      "Querying tool budget_2018 with query: What was the total amount of San Francisco's budget in 2018?\n",
      "Querying tool budget_2019 with query: What was the total amount of San Francisco's budget in 2019?\n",
      "Tool budget_2019 returned: The total amount of San Francisco's budget in 2019 was $12.3 billion.\n",
      "Querying tool budget_2021 with query: What was the total amount of San Francisco's budget in 2021?\n",
      "Tool budget_2018 returned: The total amount of San Francisco's budget in 2018 was $2,169,893 in thousands of dollars.\n",
      "Querying tool budget_2022 with query: What was the total amount of San Francisco's budget in 2022?\n",
      "Tool budget_2017 returned: The total amount of San Francisco's budget in 2017 was $10.1 billion.\n",
      "Querying tool budget_2023 with query: What was the total amount of San Francisco's budget in 2023?\n",
      "Tool budget_2016 returned: The total amount of San Francisco's budget in 2016 was $9.6 billion.\n",
      "Tool budget_2021 returned: The total amount of San Francisco's budget in 2021 was $126,960,000.\n",
      "Tool budget_2022 returned: The total amount of San Francisco's budget in 2022 was $13,248,709,511.\n",
      "Tool budget_2023 returned: The total amount of San Francisco's budget in 2023 was $14.613 billion.\n"
     ]
    }
   ],
   "source": [
    "# run the workflow\n",
    "handler = workflow.run(\n",
    "    query=\"How has the total amount of San Francisco's budget changed from 2016 to 2023?\",\n",
    "    tools=query_engine_tools,\n",
    ")\n",
    "\n",
    "# stream the events as they come in\n",
    "async for event in handler.stream_events():\n",
    "    if hasattr(event, \"msg\"):\n",
    "        print(event.msg)\n",
    "\n",
    "# get the final result\n",
    "result = await handler"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "The total amount of San Francisco's budget has changed from $9.6 billion in 2016 to $14.613 billion in 2023. This represents an increase of $5.013 billion over the seven-year period.\n"
     ]
    }
   ],
   "source": [
    "print(str(result))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Planning step: items=[QueryPlanItem(name='budget_2023', query='What were the major spending categories in the San Francisco 2023 budget?'), QueryPlanItem(name='budget_2016', query='What were the major spending categories in the San Francisco 2016 budget?')]\n",
      "Querying tool budget_2023 with query: What were the major spending categories in the San Francisco 2023 budget?\n",
      "Querying tool budget_2016 with query: What were the major spending categories in the San Francisco 2016 budget?\n",
      "Tool budget_2016 returned: Public Protection, Public Works, Transportation & Commerce, Human Welfare & Neighborhood Development, Community Health, Culture & Recreation, General Administration & Finance, General City Responsibilities.\n",
      "Tool budget_2023 returned: The major spending categories in the San Francisco 2023 budget were Community Health, Culture & Recreation, General Administration & Finance, General City Responsibilities, Human Welfare & Neighborhood Development, Public Protection, and Public Works, Transportation & Commerce.\n"
     ]
    }
   ],
   "source": [
    "# run the workflow again, with a new query\n",
    "handler = workflow.run(\n",
    "    query=\"What were the major spending categories in the 2023 budget vs. 2016?\",\n",
    "    tools=query_engine_tools,\n",
    ")\n",
    "\n",
    "# stream the events as they come in\n",
    "async for event in handler.stream_events():\n",
    "    if hasattr(event, \"msg\"):\n",
    "        print(event.msg)\n",
    "\n",
    "# get the final result\n",
    "result = await handler"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "The major spending categories in the San Francisco 2023 budget were Community Health, Culture & Recreation, General Administration & Finance, General City Responsibilities, Human Welfare & Neighborhood Development, Public Protection, and Public Works, Transportation & Commerce. In comparison, the major spending categories in the 2016 budget were Public Protection, Public Works, Transportation & Commerce, Human Welfare & Neighborhood Development, Community Health, Culture & Recreation, General Administration & Finance, and General City Responsibilities.\n"
     ]
    }
   ],
   "source": [
    "print(str(result))"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "llama-index-caVs7DDe-py3.11",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
